package com.offcn.bigdata.spark.homework

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
/*
    3、求出不同标签的公司数量和招聘数量（只输出招聘需求最大的50个标签）
    结果样式：
    高级,5232,1414
    金融,4865,995
    资深,3717,1080
    Java,3531,1154
    大数据,3375,831
    ........
 */
object HomeWork2 {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${HomeWork2.getClass.getSimpleName}")
            .setMaster("local[*]")

        val sc = new SparkContext(conf)


        val lines = sc.textFile("file:/E:/data/spark/core/lagou.txt")

        val occupations: RDD[Occupation] = lines.map(line => {
            val split = line.split("\\^")
            if (split == null || split.length != 10) {
                Occupation(-1, null, null, null, null, null, null, null, null, null)
            } else {
                Occupation(split(0).toInt, split(1), split(2), split(3), split(4),
                    split(5), split(6), split(7), split(8), split(9))
            }
        }).filter(occupation => occupation.id != -1)

        //这其中的每一条记录，代表了该公司的一个招聘要求
        val distinctTags = occupations.flatMap(occu => {
            val tags = occu.tag.split(",")
            val ab = ArrayBuffer[(String, JobResult)]()
            for(tag <- tags) {
                ab.append((tag + "|" + occu.company, JobResult(tag, 1, 1)))
            }
            ab
        }).reduceByKey((jr1, jr2) => {
            //统计出了tag+公司对应的招聘的数量
            JobResult(jr1.tag, 1, jr1.job + jr2.job)
        })

        val result = distinctTags.map{case (tc, jobResult) => {
            val tag = tc.split("\\|")(0)
            (tag, JobResult(tag, 1, jobResult.job))
        }}.reduceByKey((jr1, jr2) => {
            JobResult(jr1.tag, jr1.company + jr2.company, jr1.job + jr2.job)
        })
        println("标签\t招聘量\t公司数量")
        result.takeOrdered(10)(new Ordering[(String, JobResult)](){
            override def compare(x: (String, JobResult), y: (String, JobResult)): Int = {
                y._2.job.compareTo(x._2.job)
            }
        }).foreach{case (tag, jobResult) => {
            println(s"${tag}\t${jobResult.job}\t${jobResult.company}")
        }}

        sc.stop()
    }
}
case class JobResult(tag: String, company: Int, job: Int)
case class Occupation(id: Int, job: String, addr: String, tag: String, company: String, salary: String, edu: String, exp: String, `type`: String, level: String)